{
  "metadata" : {
    "name" : "GraphX",
    "user_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "auto_save_timestamp" : "1970-01-01T01:00:00.000Z",
    "language_info" : {
      "name" : "scala",
      "file_extension" : "scala",
      "codemirror_mode" : "text/x-scala"
    },
    "trusted" : true,
    "customLocalRepo" : null,
    "customRepos" : null,
    "customDeps" : [ "com.fasterxml.jackson.module %% jackson-module-scala % 2.3.3" ],
    "customImports" : null,
    "customArgs" : null,
    "customSparkConf" : null
  },
  "cells" : [ {
    "metadata" : {
      "id" : "1BD94847B63C4394AF1B3AAFABA54F4C"
    },
    "cell_type" : "markdown",
    "source" : "# Analyse GitHub archives using GraphX"
  }, {
    "metadata" : {
      "id" : "1C6EE6059BED491E873A4D067054036E"
    },
    "cell_type" : "markdown",
    "source" : "_Trying to detect open source communies based on contributions_"
  }, {
    "metadata" : {
      "id" : "9D487F6FAEDE4C6B8976FAABA268FCAD"
    },
    "cell_type" : "markdown",
    "source" : "## Setup the environment to work with GraphX and Json data "
  }, {
    "metadata" : {
      "id" : "C74CFA71CC0449DFB58318F3C3E6ED9B"
    },
    "cell_type" : "markdown",
    "source" : "### Import some github data"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "B53B1CCBC7BA43008467D8A9061DAB9A"
    },
    "cell_type" : "code",
    "source" : "import sys.process._\nif (!new java.io.File(\"/tmp/github.json\").exists) {\n  new java.net.URL(\"http://data.githubarchive.org/2015-01-01-15.json.gz\")  #> new java.io.File(\"/tmp/github.json.gz\") !!\n  \n  Seq(\"gunzip\", \"-f\", \"/tmp/github.json.gz\")!!\n}\n",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "B6804BE638AE464B98B2806D4C891B3F"
    },
    "cell_type" : "markdown",
    "source" : "### **The size of the data**"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "DABBD04745134831822739BFF782EF9F"
    },
    "cell_type" : "code",
    "source" : ":sh du -h /tmp/github.json",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "2DA4D8DC9A01453ABF0832C1A461AC71"
    },
    "cell_type" : "markdown",
    "source" : "## First some Spark manipulation "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "58F5BF50D84345C193F523A533DEC6E9"
    },
    "cell_type" : "code",
    "source" : "val raw = sparkContext.textFile(\"/tmp/github.json\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "5C9D921E0A8744A18015E46326C4EB34"
    },
    "cell_type" : "markdown",
    "source" : "### The number of lines in the file"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "14CFF48EA8F94EC48BA1E896BA7B0DD3"
    },
    "cell_type" : "code",
    "source" : "raw.count",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "D07A81FBF86A4D7FAA6C14469730D4C4"
    },
    "cell_type" : "markdown",
    "source" : "### Convert line to JSON _(simple Map of Maps)_"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "0060C72E46054322B4C6F8AB8AF21F9E"
    },
    "cell_type" : "code",
    "source" : "val json = raw.mapPartitions{ lines => \n  import com.fasterxml.jackson._\n  import com.fasterxml.jackson.core._\n  import com.fasterxml.jackson.databind._\n  import com.fasterxml.jackson.module.scala._\n  val mapper = new ObjectMapper()\n  mapper.registerModule(DefaultScalaModule)\n  lines.map(x => mapper.readValue(x, classOf[Map[String,Any]]))\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "ADFD8F12242A477B8780C625D874CF79"
    },
    "cell_type" : "markdown",
    "source" : "### Let's look at the two first rows"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "7FF0D47D2A894CCD823F0DB580FB7B26"
    },
    "cell_type" : "code",
    "source" : "json.take(2).toList",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "1407793FFDD64B01802311C1B07F6DAB"
    },
    "cell_type" : "markdown",
    "source" : "## The graph part "
  }, {
    "metadata" : {
      "id" : "85AC2572551D423BBC7C44F34FBB92BD"
    },
    "cell_type" : "markdown",
    "source" : "We could use the *actors* and the *repos* as vertices, and use the *event* as relationship between them.\n\nThere are *id*s for actor and repo, so we can directly use them in GraphX as such."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "9CE31BE9273443A68B3C911DD4D6A70D"
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.rdd._\nimport org.apache.spark.graphx._",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "0A0CD4DBB16B4E9E9CDCC65E731A006D"
    },
    "cell_type" : "markdown",
    "source" : "### RDD vertices {Actors U Repos}"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "05A8F9E441794BFC84FE96B82EF901C3"
    },
    "cell_type" : "code",
    "source" : "val actors:RDD[(VertexId, (Short, String))] = json.map{ x => \n  val actor = x(\"actor\").asInstanceOf[Map[String, Any]]\n  val id = actor(\"id\").toString.toLong\n  val login = actor(\"login\").toString\n  (id, (0, login))\n}\nval repos:RDD[(VertexId, (Short, String))] = json.map{ x => \n  val repo = x(\"repo\").asInstanceOf[Map[String, Any]]\n  val id = repo(\"id\").toString.toLong\n  val name = repo(\"name\").toString\n  (id, (1, name))\n}\nval vertices:RDD[(VertexId, (Short, String))] = actors union repos",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "6531070266A54335AC78EBAF08D8675C"
    },
    "cell_type" : "markdown",
    "source" : "### RDD of Edges "
  }, {
    "metadata" : {
      "id" : "5FE65B28EAD64927848DA9EF516D7C16"
    },
    "cell_type" : "markdown",
    "source" : "Now an **RDD** with the edges (including reverse ones, that is from repo to actor)"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "A07CE023ADC24EA48A240D2F2F1BB3BD"
    },
    "cell_type" : "code",
    "source" : "// None → repo to actor\n// Some(\"PushEvent\") → actor pushed on repo\nval edges:RDD[Edge[Option[String]]] = json.flatMap { x =>\n  val event = x.get(\"type\").map(_.toString)\n  val actor = x(\"actor\").asInstanceOf[Map[String, Any]](\"id\").toString.toLong\n  val repo = x(\"repo\").asInstanceOf[Map[String, Any]](\"id\").toString.toLong\n  List(Edge(actor, repo, event), Edge(actor, repo, None))\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "6F06090326054C0C81B5059015FEA937"
    },
    "cell_type" : "markdown",
    "source" : "### Graph"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "34BAB4C44EAA47F7885F208100371595"
    },
    "cell_type" : "code",
    "source" : "val graph = Graph(vertices, edges)",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "53C72E55FF2644878ACD3DD0DDB4DAEF"
    },
    "cell_type" : "markdown",
    "source" : "## Open source working community "
  }, {
    "metadata" : {
      "id" : "986F138BE8234FAA8B3D3E9A47952BFA"
    },
    "cell_type" : "markdown",
    "source" : "A very very simple example of such extraction would simply be to extract the connected components "
  }, {
    "metadata" : {
      "id" : "04FE2C729AAE49508DC800DF559FFF2B"
    },
    "cell_type" : "markdown",
    "source" : "So that, a component is the actors and repos having connections between them but not with other actor or repos. A connection being a collaboration."
  }, {
    "metadata" : {
      "id" : "9EABAC5F9D16484288E6D05F15DB9AEB"
    },
    "cell_type" : "markdown",
    "source" : "### Computing connected components "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "362F362DBB7D469C83468DD57B839177"
    },
    "cell_type" : "code",
    "source" : "val cc = graph.connectedComponents",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "D36E686824E54AF18AF80596EEC4DF23"
    },
    "cell_type" : "markdown",
    "source" : "The `cc` variable is the original graph but vertives' payload/properties is only the cluster to which is belongs. The cluster is characterized by the smallest `VertexId` in the cluster."
  }, {
    "metadata" : {
      "id" : "99C104F8FFF544B98C27658691D4B0D3"
    },
    "cell_type" : "markdown",
    "source" : "#### Number of connected components "
  }, {
    "metadata" : {
      "id" : "DFB395F8E0604265B2523730559A6017"
    },
    "cell_type" : "markdown",
    "source" : "Computing the number of clusters can easily be done by counting the number of distinct `payload` for the vertices."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "7AB46A2F7DC74B95A1B362D48A2BFC9D"
    },
    "cell_type" : "code",
    "source" : "<strong style=\"color: red\">{cc.vertices.map(_._2).distinct.count}</strong>",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "19F67CE9341D4D6095E0B92A904B330C"
    },
    "cell_type" : "markdown",
    "source" : "### Clusters by language "
  }, {
    "metadata" : {
      "id" : "702A2211573A4EBF9054BF0C82636648"
    },
    "cell_type" : "markdown",
    "source" : "We can try to concentrate our analysis to specific languages, since we don't have the language information in the events data (we need extra call to the GitHub API for that) we'll take a naive approach, that is, **we'll only consider the repo having the language in their name** -- albeit it's not 100% safe."
  }, {
    "metadata" : {
      "id" : "89A3BF69466A4735A8CB6A75E0911565"
    },
    "cell_type" : "markdown",
    "source" : "#### Utility functions"
  }, {
    "metadata" : {
      "id" : "DCF3EE37884D446BA78A9FF9BD77B993"
    },
    "cell_type" : "markdown",
    "source" : "The following function compute retrieves the cluster for a given cluster."
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "C323E0F4938646A084635FE63079CC23"
    },
    "cell_type" : "code",
    "source" : "import org.apache.spark.SparkContext._\ndef cluster(lgg:String) = {\n  // collect all repos for the language `lgg`\n  val lggRepos:List[(VertexId, (Short, String))] = vertices.filter { x => \n                    x._2._1 /*vertex type*/ == 1 /*repo*/ && \n                    x._2._2/*repo name*/.toLowerCase.contains(lgg) //here we SHOULD exclude the prefix of '/'\n                }.collect().toList\n  // keep only the set\n  // ***** IN A CLUSTER →→→ THIS NEEDS TO BE A BROADCAST VARIABLE *****wwwwzeqc\n  val lggRepoIds:List[Long] = lggRepos.map(_._1).distinct\n  // clusters \"id\" for these repos → BROADCAST\n  val clusterIds:List[Long] = cc.vertices.filter(x => lggRepoIds.contains(x._1))\n                            .map(_._2)\n                            .collect()\n                            .toList\n  // return the vertices being clustered sorted by decreasing cardinality\n  val clusters:List[(Long, Iterable[Long])] = cc.vertices.filter{ x => clusterIds.contains(x._2) }\n                 .groupBy(_._2)\n                 .mapValues(_.map(_._1))\n                 .collect().toList\n                 .sortBy(_._2.size)\n                 .reverse\n  clusters\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "32AFF7E9306642FD8B70D7E178D26A74"
    },
    "cell_type" : "markdown",
    "source" : "Shows the list of repos and actors included in the given cluster"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "F0A0835BF1244156B1F683EBD3F1DF20"
    },
    "cell_type" : "code",
    "source" : "def showCluster(lgg:String, clusterIds:List[Long]) = {\n  val c = graph.vertices\n               .filter(x => clusterIds.contains(x._1))\n               .collect().toList\n\n  <div>\n  <p><strong>Repos</strong></p>\n  <ul>{ \n  c.collect { case (x, (1, r)) =>\n           //show the repo\n           val t = if (r.toLowerCase.contains(lgg)) <strong style=\"color: red;\">{r}</strong> else r\n             <li><a href={\"http://github.com/\"+r}>{t}</a></li> \n          }\n  }</ul>\n  <p><strong>Users</strong></p>\n  <ul>{ \n  c.collect { case (x, (0, n)) =>\n           //show the repo\n             <li><a href={\"http://github.com/\"+n}>{n}</a></li> \n          }\n  }</ul>\n  </div>\n}",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "12942FF6A969429B935BD80C0640FB34"
    },
    "cell_type" : "markdown",
    "source" : "## Javascript"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "E44CB7C7AE6B4ADF805A0EA2F832B5BA"
    },
    "cell_type" : "code",
    "source" : "val js = cluster(\"js\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "26B903CE8BF04E298C1C03B785D6A0EF"
    },
    "cell_type" : "markdown",
    "source" : "**Let's look at the 3 biggest clusters**"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "DD7B69FE151E46808E82363C99330E59"
    },
    "cell_type" : "code",
    "source" : "layout(3, js.take(3).map(r => html(showCluster(\"js\", r._2.toList))))",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "collapsed" : false,
      "id" : "03F1E192F3434F5BA38E306A8E27DBBA"
    },
    "cell_type" : "markdown",
    "source" : "## Scala"
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "A69C37F84AF6440A94648BF60C85F1E4"
    },
    "cell_type" : "code",
    "source" : "val scala = cluster(\"scala\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "C1DC837AFF744CB78B914B79BC5AD157"
    },
    "cell_type" : "code",
    "source" : "layout(4, scala.map(r => html(showCluster(\"scala\", r._2.toList))))\n",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "id" : "4B4A1BEA8AFB44B997438FF6DC6A163F"
    },
    "cell_type" : "markdown",
    "source" : "## Spark ^^ "
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "221C29C0EAE043389A22E322ADD833D1"
    },
    "cell_type" : "code",
    "source" : "val spark = cluster(\"spark\")",
    "outputs" : [ ]
  }, {
    "metadata" : {
      "trusted" : true,
      "input_collapsed" : false,
      "collapsed" : true,
      "id" : "36223032EA7B46C3A019E988400E807D"
    },
    "cell_type" : "code",
    "source" : "layout(4, spark.map(r => html(showCluster(\"spark\", r._2.toList))))",
    "outputs" : [ ]
  } ],
  "nbformat" : 4
}